import reprlib
from collections import UserDict
from collections.abc import Iterable

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.utils.validation import check_is_fitted

from . import _dataframe as sbd
from . import _utils
from . import selectors as s
from ._apply_to_cols import SingleColumnTransformer
from ._check_input import CheckInputDataFrame
from ._clean_categories import CleanCategories
from ._clean_null_strings import CleanNullStrings
from ._datetime_encoder import DatetimeEncoder
from ._drop_uninformative import DropUninformative
from ._select_cols import Drop
from ._sklearn_compat import _VisualBlock
from ._string_encoder import StringEncoder
from ._to_datetime import ToDatetime
from ._to_float import ToFloat
from ._to_str import ToStr
from ._wrap_transformer import wrap_transformer

__all__ = ["TableVectorizer"]


class PassThrough(SingleColumnTransformer):
    def fit_transform(self, column, y=None):
        return column

    def transform(self, column):
        return column


HIGH_CARDINALITY_TRANSFORMER = StringEncoder(n_components=30)
LOW_CARDINALITY_TRANSFORMER = OneHotEncoder(
    sparse_output=False,
    dtype="float32",
    handle_unknown="ignore",
    drop="if_binary",
)
DATETIME_TRANSFORMER = DatetimeEncoder()
NUMERIC_TRANSFORMER = PassThrough()


class ShortReprDict(UserDict):
    """A dict with a shorter repr.

    Examples
    --------
    >>> d = {'one': 1, 'two': 2, 'three': 3, 'four': 4}
    >>> d
    {'one': 1, 'two': 2, 'three': 3, 'four': 4}
    >>> from skrub._table_vectorizer import ShortReprDict
    >>> ShortReprDict(d)
    {'four': 4, 'one': 1, ...}
    >>> _['two']
    2
    """

    def __repr__(self):
        r = reprlib.Repr()
        r.maxdict = 2
        return r.repr(dict(self))


def _created_by_predicate(col, transformers):
    return any(sbd.name(col) in t.created_outputs_ for t in transformers)


def _created_by(*transformers):
    """Selector for columns created by one of the provided transformers.

    Each of ``transformers`` must be an instance of ``ApplyToCols``.
    A column is matched if it was created (or modified) by one of them, i.e. if
    it is listed in one of their ``created_outputs_`` fitted attributes.

    .. note::

        This selector works by storing references to the ``transformers``. If
        they are cloned, the stored reference still points to the original
        object. Therefore if this selector is used to refer to earlier steps in
        a pipeline and the pipeline is cloned, it will not work as it will
        inspect the original transformers, not their clones. This is fine for
        the ``TableVectorizer`` because it uses ``_created_by`` in its (private
        attribute) ``_pipeline`` which is constructed and fitted during
        ``TableVectorizer.fit``, and is never cloned. ``_created_by`` is a
        private helper of ``TableVectorizer``, not meant to be generally useful
        and it should not be moved to the ``skrub.selectors`` module.
    """
    return s.Filter(
        _created_by_predicate,
        args=(transformers,),
        selector_repr=f"created_by(<any of {len(transformers)} transformers>)",
    )


def _check_transformer(transformer):
    if isinstance(transformer, str):
        if transformer == "passthrough":
            return PassThrough()
        if transformer == "drop":
            return Drop()
        raise ValueError(
            f"Value not understood: {transformer!r}. Please provide either"
            " 'passthrough', 'drop', or a scikit-learn transformer."
        )
    return clone(transformer)


def _get_preprocessors(
    *,
    cols,
    drop_null_fraction,
    drop_if_unique,
    drop_if_constant,
    n_jobs,
    add_tofloat32=True,
    datetime_format=None,
):
    steps = [CheckInputDataFrame()]
    transformers = [
        CleanNullStrings(),
        DropUninformative(
            drop_null_fraction=drop_null_fraction,
            drop_if_constant=drop_if_constant,
            drop_if_unique=drop_if_unique,
        ),
        ToDatetime(format=datetime_format),
    ]
    if add_tofloat32:
        transformers.append(ToFloat())
    transformers += [
        CleanCategories(),
        ToStr(),
    ]

    for transformer in transformers:
        steps.append(
            wrap_transformer(
                transformer,
                cols,
                allow_reject=True,
                n_jobs=n_jobs,
                columnwise=True,
            )
        )
    return steps


class Cleaner(TransformerMixin, BaseEstimator):
    """Column-wise consistency checks and sanitization of dtypes, null values and dates.

    The ``Cleaner`` performs some consistency checks and basic preprocessing
    such as detecting null values represented as strings (e.g. ``'N/A'``), parsing
    dates, and removing uninformative columns. See the "Notes" section for a full list.

    Parameters
    ----------
    drop_null_fraction : float or None, default=1.0
        Fraction of null above which the column is dropped. If ``drop_null_fraction``
        is set to ``1.0``, the column is dropped if it contains only
        nulls or NaNs (this is the default behavior). If ``drop_null_fraction`` is a
        number in ``[0.0, 1.0)``, the column is dropped if the fraction of nulls
        is strictly larger than ``drop_null_fraction``. If ``drop_null_fraction`` is
        ``None``, this selection is disabled: no columns are dropped based on the
        number of null values they contain.

    drop_if_constant : bool, default=False
        If set to true, drop columns that contain a single unique value. Note that
        missing values are considered as one additional distinct value.

    drop_if_unique : bool, default=False
        If set to true, drop columns that contain only unique values, i.e., the number
        of unique values is equal to the number of rows in the column. Numeric columns
        are never dropped.

    datetime_format : str, default=None
        The format to use when parsing dates. If None, the format is inferred.

    numeric_dtype : "float32" or None, default=None
        If set to ``float32``, convert columns with numerical information
        to ``np.float32`` dtype thanks to the transformer ``ToFloat``.
        If ``None``, numerical columns are not modified.

    n_jobs : int, default=None
        Number of jobs to run in parallel.
        ``None`` means 1 unless in a joblib ``parallel_backend`` context.
        ``-1`` means using all processors.

    Attributes
    ----------
    all_processing_steps_ : dict
        Maps the name of each column to a list of all the processing steps that were
        applied to it.

    all_outputs_ : list of str
        Column names of the output of ``transform``.

    See Also
    --------
    TableVectorizer :
        Process columns of a dataframe and convert them to a numeric (vectorized)
        representation.

    ToFloat :
        Convert numeric columns to ``np.float32``, to have consistent numeric
        types and representation of missing values. More informative columns (e.g.,
        categorical or datetime) are not converted.

    ApplyToCols :
        Apply a given transformer separately to each column in a selection of columns.
        Useful to complement the default heuristics of the ``Cleaner``.

    ApplyToFrame :
        Apply a given transformer jointly to all columns in a selection of columns.
        Useful to complement the default heuristics of the ``Cleaner``.

    Notes
    -----
    The ``Cleaner`` performs the following set of transformations on each column:

    - ``CleanNullStrings()``: replace strings used to represent missing values
      with NA markers.

    - ``DropUninformative()``: drop the column if it is considered to be
      "uninformative". A column is considered to be "uninformative" if it contains
      only missing values (``drop_null_fraction``), only a constant value
      (``drop_if_constant``), or if all values are distinct (``drop_if_unique``).
      By default, the ``Cleaner`` keeps all columns, unless they contain only
      missing values.
      Note that setting ``drop_if_unique`` to ``True`` may lead to dropping columns
      that contain text.

    - ``ToDatetime()``: parse datetimes represented as strings and return them as
      actual datetimes with the correct dtype. If ``datetime_format`` is provided,
      it is forwarded to ``ToDatetime()``. Otherwise, the format is inferred.

    - ``CleanCategories()``: process categorical columns depending on the dataframe
      library (Pandas or Polars) to force consistent typing and avoid issues downstream.

    - ``ToStr()``: convert columns to strings, unless they are numerical,
      categorical, or datetime.

    If ``numeric_dtype`` is set to ``float32``, the ``Cleaner`` will also convert
    numeric columns to this dtype, including numbers represented
    as string, ensuring a consistent representation
    of numbers and missing values. This can be useful if the ``Cleaner``
    is used as a preprocessing step in a skrub pipeline.

    Examples
    --------
    >>> from skrub import Cleaner
    >>> import pandas as pd
    >>> df = pd.DataFrame({
    ...     'A': ['one', 'two', 'two', 'three'],
    ...     'B': ['02/02/2024', '23/02/2024', '12/03/2024', '13/03/2024'],
    ...     'C': ['1.5', 'N/A', '12.2', 'N/A'],
    ...     'D': [1.5, 2.0, 2.5, 3.0],
    ... })
    >>> df
           A           B     C    D
    0    one  02/02/2024   1.5  1.5
    1    two  23/02/2024   N/A  2.0
    2    two  12/03/2024  12.2  2.5
    3  three  13/03/2024   N/A  3.0
    >>> df.dtypes # doctest: +SKIP
    A       ...
    B       ...
    C       ...
    D   float64
    dtype: object

    The Cleaner will parse datetime columns and convert nulls to dtypes
    suitable to those of the column (e.g., ``np.NaN`` for numerical columns).

    >>> cleaner = Cleaner()
    >>> cleaner.fit_transform(df)
           A          B     C    D
    0    one 2024-02-02   1.5  1.5
    1    two 2024-02-23   NaN  2.0
    2    two 2024-03-12  12.2  2.5
    3  three 2024-03-13   NaN  3.0

    >>> cleaner.fit_transform(df).dtypes  # doctest: +SKIP
    A               ...
    B    datetime64[ns]
    C               ...
    D           float64
    dtype: object

    We can inspect all the processing steps that were applied to a given column:

    >>> cleaner.all_processing_steps_['A']
    [CleanNullStrings(), DropUninformative(), ToStr()]
    >>> cleaner.all_processing_steps_['B']
    [CleanNullStrings(), DropUninformative(), ToDatetime()]
    >>> cleaner.all_processing_steps_['C']
    [CleanNullStrings(), DropUninformative(), ToStr()]
    >>> cleaner.all_processing_steps_['D']
    [DropUninformative()]
    """

    def __init__(
        self,
        drop_null_fraction=1.0,
        drop_if_constant=False,
        drop_if_unique=False,
        datetime_format=None,
        numeric_dtype=None,
        n_jobs=1,
    ):
        self.drop_null_fraction = drop_null_fraction
        self.drop_if_constant = drop_if_constant
        self.drop_if_unique = drop_if_unique
        self.datetime_format = datetime_format
        self.numeric_dtype = numeric_dtype
        self.n_jobs = n_jobs

    def fit_transform(self, X, y=None):
        """Fit transformer and transform dataframe.

        Parameters
        ----------
        X : dataframe of shape (n_samples, n_features)
            Input data to transform.

        y : array-like of shape (n_samples,) or (n_samples, n_outputs) or None, \
                default=None
            Target values for supervised learning (None for unsupervised
            transformations).

        Returns
        -------
        dataframe
            The transformed input.
        """

        add_tofloat32 = self.numeric_dtype == "float32"
        if self.numeric_dtype not in (None, "float32"):
            raise ValueError(
                "`numeric_dtype` must be one of"
                f"[`None`, `'float32'`]. Found {self.numeric_dtype}."
            )

        all_steps = _get_preprocessors(
            cols=s.all(),
            drop_null_fraction=self.drop_null_fraction,
            drop_if_constant=self.drop_if_constant,
            drop_if_unique=self.drop_if_unique,
            n_jobs=self.n_jobs,
            add_tofloat32=add_tofloat32,
            datetime_format=self.datetime_format,
        )
        self._pipeline = make_pipeline(*all_steps)
        result = self._pipeline.fit_transform(X)
        self.all_outputs_ = sbd.column_names(result)
        input_names = all_steps[0].feature_names_out_
        self.all_processing_steps_ = {col: [] for col in input_names}
        for step in all_steps[1:]:
            for col, transformer in step.transformers_.items():
                self.all_processing_steps_[col].append(transformer)
        return result

    def transform(self, X):
        """Transform dataframe.

        Parameters
        ----------
        X : dataframe of shape (n_samples, n_features)
            Input data to transform.

        Returns
        -------
        dataframe
            The transformed input.
        """
        return self._pipeline.transform(X)

    def fit(self, X, y=None):
        """Fit transformer.

        Parameters
        ----------
        X : dataframe of shape (n_samples, n_features)
            Input data to transform.

        y : array-like, shape (n_samples,) or (n_samples, n_outputs) or None, \
                default=None
            Target values for supervised learning (None for unsupervised
            transformations).

        Returns
        -------
        self : Cleaner
            The fitted estimator.
        """
        self.fit_transform(X, y=y)
        return self

    def get_feature_names_out(self, input_features=None):
        """Return the column names of the output of ``transform`` as a list of strings.

        Parameters
        ----------
        input_features : array-like of str or None, default=None
            Ignored.

        Returns
        -------
        list of strings
            The column names.
        """
        check_is_fitted(self, "all_outputs_")
        return np.asarray(self.all_outputs_)


class TableVectorizer(TransformerMixin, BaseEstimator):
    """Transform a dataframe to a numeric (vectorized) representation.

    This transformer preprocesses the given dataframe by first cleaning the data
    to ensure consistent numerical dtypes (float32). The TableVectorizer will
    automatically convert to float32 any string column that contains only numerical
    information.
    Then it encodes each column with an encoder suitable for its dtype. Categorical
    features are encoded differently depending on their cardinality.

    .. note::

        The ``specific_transformers`` parameter will be removed in a future
        version of ``skrub``, when better utilities for building complex
        pipelines are introduced.

    Parameters
    ----------
    cardinality_threshold : int, default=40
        String and categorical columns with a number of unique values strictly smaller
        than this threshold are handled by the transformer ``low_cardinality``, the rest
        are handled by the transformer ``high_cardinality``.

    low_cardinality : transformer, "passthrough" or "drop", \
            default=OneHotEncoder instance
        The transformer for string or categorical columns with strictly fewer than
        ``cardinality_threshold`` unique values. By default, we use a
        :class:`~sklearn.preprocessing.OneHotEncoder` that ignores unknown categories
        and drop one of the transformed columns if the feature contains only 2
        categories.

    high_cardinality : transformer, "passthrough" or "drop", default=StringEncoder instance
        The transformer for string or categorical columns with at least
        ``cardinality_threshold`` unique values. The default is a
        :class:`~skrub.StringEncoder` with 30 components (30 output columns for each
        input).

        .. versionchanged:: 0.6.0
           The default ``high_cardinality`` encoder has been changed from
           :class:`~skrub.GapEncoder` to :class:`~skrub.StringEncoder`.

    numeric : transformer, "passthrough" or "drop", default="passthrough"
        The transformer for numeric columns (floats, ints, booleans).

    datetime : transformer, "passthrough" or "drop", default=DatetimeEncoder instance
        The transformer for date and datetime columns. By default, we use a
        :class:`~skrub.DatetimeEncoder`.

    specific_transformers : list of (transformer, list of column names) pairs, \
            default=()
        Override the categories above for the given columns and force using the
        specified transformer. This disables any preprocessing usually done by
        the TableVectorizer; the columns are passed to the transformer without
        any modification. A column is not allowed to appear twice in
        ``specific_transformers``. Using ``specific_transformers`` provides
        similar functionality to what is offered by scikit-learn's
        :class:`~sklearn.compose.ColumnTransformer`.

    drop_null_fraction : float or None, default=1.0
        Fraction of null above which the column is dropped. If `drop_null_fraction` is
        set to ``1.0``, the column is dropped if it contains only
        nulls or NaNs (this is the default behavior). If `drop_null_fraction` is a
        number in ``[0.0, 1.0)``, the column is dropped if the fraction of nulls
        is strictly larger than `drop_null_fraction`. If `drop_null_fraction` is ``None``,
        this selection is disabled: no columns are dropped based on the number
        of null values they contain.

    drop_if_constant : bool, default=False
        If set to true, drop columns that contain a single unique value. Note that
        missing values are considered as one additional distinct value.

    drop_if_unique : bool, default=False
        If set to true, drop columns that contain only unique values, i.e., the number
        of unique values is equal to the number of rows in the column. Numeric columns
        are never dropped.

    datetime_format : str, default=None
        The format to use when parsing dates. If None, the format is inferred.

    n_jobs : int, default=None
        Number of jobs to run in parallel.
        ``None`` means 1 unless in a joblib ``parallel_backend`` context.
        ``-1`` means using all processors.

    Attributes
    ----------
    transformers_ : dict
        Maps the name of each column to the fitted transformer that was applied
        to it.

    column_to_kind_ : dict
        Maps each column name to the kind (``"high_cardinality"``,
        ``"low_cardinality"``, ``"specific"``, etc.) it was assigned.

    kind_to_columns_ : dict
        The reverse of ``column_to_kind_``: maps each kind of column
        (``"high_cardinality"``, ``"low_cardinality"``, etc.) to a list of
        column names. For example ``kind_to_columns['datetime']`` contains the
        names of all datetime columns.

    input_to_outputs_ : dict
        Maps the name of each input column to the names of the corresponding
        output columns.

    output_to_input_ : dict
        The reverse of ``input_to_outputs_``: maps the name of each output
        column to the name of the column in the input dataframe from which it
        was derived.

    all_processing_steps_ : dict
        Maps the name of each column to a list of all the processing steps that were
        applied to it. Those steps may include some pre-processing transformations such
        as converting strings to datetimes or numbers, the main transformer (e.g. the
        :class:`~skrub.DatetimeEncoder`), and a post-processing step casting the main
        transformer's output to :obj:`numpy.float32`. See the "Examples" section below
        for details.

    feature_names_in_ : list of str
        The names of the input columns, after applying some cleaning (casting
        all column names to strings and deduplication).

    n_features_in_ : int
        The number of input columns.

    all_outputs_ : list of str
        The names of the output columns.

    See Also
    --------
    tabular_pipeline :
        A function that accepts a scikit-learn estimator and creates a pipeline
        combining a ``TableVectorizer``, optional missing value imputation and
        the provided estimator.

    Cleaner :
        Preprocesses each column of a dataframe with consistency checks and
        sanitization, e.g., of null values or dates.

    ApplyToCols :
        Apply a given transformer separately to each column in a selection of columns.
        Useful to complement the default heuristics of the ``TableVectorizer``.

    ApplyToFrame :
        Apply a given transformer jointly to all columns in a selection of columns.
        Useful to complement the default heuristics of the ``TableVectorizer``.

    Notes
    -----
    The TableVectorizer applies a different transformation to each of several kinds of columns:

    - `numeric`: floats, integers, and booleans.
    - `datetime`: datetimes and dates.
    - `low_cardinality`: string and categorical columns with a count
      of unique values smaller than a given threshold (40 by default). Category encoding
      schemes such as one-hot encoding, ordinal encoding etc. are typically appropriate
      for columns with few unique values.
    - `high_cardinality`: string and categorical columns with many
      unique values, such as free-form text. Such columns have so many distinct values
      that it is not possible to assign a distinct representation to each: the dimension
      would be too large and there would be too few examples of each category.
      Representations designed for text, such as topic modelling
      (:class:`~skrub.GapEncoder`) or locality-sensitive hashing
      (:class:`~skrub.MinHash`) are more appropriate.

    .. note::

        Transformations are applied **independently on each column**. A
        different transformer instance is used for each column separately;
        multivariate transformations are therefore not supported.

    The transformer for each kind of column can be configured with the corresponding
    parameter. A transformer is expected to be a `compatible scikit-learn transformer
    <https://scikit-learn.org/stable/glossary.html#term-transformer>`_. Special-cased
    strings ``"drop"`` and ``"passthrough"`` are accepted as well, to indicate to drop
    the columns or to pass them through untransformed, respectively.

    Additionally, it is possible to specify transformers for specific columns,
    overriding the categorization described above. This is done by providing a
    list of pairs ``(transformer, list_of_columns)`` as the
    ``specific_transformers`` parameter.

    Examples
    --------
    >>> from skrub import TableVectorizer
    >>> import pandas as pd
    >>> df = pd.DataFrame({
    ...     'A': ['one', 'two', 'two', 'three'],
    ...     'B': ['02/02/2024', '23/02/2024', '12/03/2024', '13/03/2024'],
    ...     'C': ['1.5', 'N/A', '12.2', 'N/A'],
    ... })
    >>> df
           A           B     C
    0    one  02/02/2024   1.5
    1    two  23/02/2024   N/A
    2    two  12/03/2024  12.2
    3  three  13/03/2024   N/A
    >>> df.dtypes
    A         ...
    B         ...
    C         ...
    dtype: object

    >>> vectorizer = TableVectorizer()
    >>> vectorizer.fit_transform(df)
       A_one  A_three  A_two  B_year  B_month  B_day  B_total_seconds     C
    0    1.0      0.0    0.0  2024.0      2.0    2.0     1.706832e+09   1.5
    1    0.0      0.0    1.0  2024.0      2.0   23.0     1.708646e+09   NaN
    2    0.0      0.0    1.0  2024.0      3.0   12.0     1.710202e+09  12.2
    3    0.0      1.0    0.0  2024.0      3.0   13.0     1.710288e+09   NaN

    We can inspect which outputs were created from a given column in the input
    dataframe:

    >>> vectorizer.input_to_outputs_['B']
    ['B_year', 'B_month', 'B_day', 'B_total_seconds']

    and the reverse mapping:

    >>> vectorizer.output_to_input_['B_total_seconds']
    'B'

    We can also see the encoder that was applied to a given column:

    >>> vectorizer.transformers_['B']
    DatetimeEncoder()
    >>> vectorizer.transformers_['A']
    OneHotEncoder(drop='if_binary', dtype='float32', handle_unknown='ignore',
                  sparse_output=False)
    >>> vectorizer.transformers_['A'].categories_
    [array(['one', 'three', 'two'], dtype=...)]

    We can see the columns grouped by the kind of encoder that was applied
    to them:

    >>> vectorizer.kind_to_columns_
    {'numeric': ['C'], 'datetime': ['B'], 'low_cardinality': ['A'], 'high_cardinality': [], 'specific': []}

    As well as the reverse mapping (from each column to its kind):

    >>> vectorizer.column_to_kind_
    {'C': 'numeric', 'B': 'datetime', 'A': 'low_cardinality'}

    Before applying the main transformer, the ``TableVectorizer`` applies
    several preprocessing steps, for example to detect numbers or dates that are
    represented as strings. By default, columns that contain only null values are
    dropped. Moreover, a final post-processing step is applied to all
    non-categorical columns in the encoder's output to cast them to float32.
    If ``datetime_format`` is provided, it will be used to parse all datetime
    columns.

    We can inspect all the processing steps that were applied to a given column:

    >>> vectorizer.all_processing_steps_['B']
    [CleanNullStrings(), DropUninformative(), ToDatetime(), DatetimeEncoder(), {'B_day': ToFloat(), 'B_month': ToFloat(), ...}]

    Note that as the encoder (``DatetimeEncoder()`` above) produces multiple
    columns, the last processing step is not described by a single transformer
    like the previous ones but by a mapping from column name to transformer.

    ``all_processing_steps_`` is useful to inspect the details of the
    choices made by the ``TableVectorizer`` during preprocessing, for example:

    >>> vectorizer.all_processing_steps_['B'][2]
    ToDatetime()
    >>> _.format_
    '%d/%m/%Y'

    **Transformers are applied separately to each column**

    The ``TableVectorizer`` vectorizes each column separately -- a different
    transformer is applied to each column; multivariate transformers are not
    allowed.

    >>> df_1 = pd.DataFrame(dict(A=['one', 'two'], B=['three', 'four']))
    >>> vectorizer = TableVectorizer().fit(df_1)
    >>> vectorizer.transformers_['A'] is not vectorizer.transformers_['B']
    True
    >>> vectorizer.transformers_['A'].categories_
    [array(['one', 'two'], dtype=...)]
    >>> vectorizer.transformers_['B'].categories_
    [array(['four', 'three'], dtype=...)]

    **Overriding the transformer for specific columns**

    We can also provide transformers for specific columns. In that case the
    provided transformer has full control over the associated columns; no other
    processing is applied to those columns. A column cannot appear twice in the
    ``specific_transformers``.

    .. note::

        This functionality is likely to be removed in a future version of the
        ``TableVectorizer``.

    The overrides are provided as a list of pairs:
    ``(transformer, list_of_column_names)``.

    >>> from sklearn.preprocessing import OrdinalEncoder
    >>> vectorizer = TableVectorizer(
    ...     specific_transformers=[('drop', ['A']), (OrdinalEncoder(), ['B'])]
    ... )
    >>> df
           A           B     C
    0    one  02/02/2024   1.5
    1    two  23/02/2024   N/A
    2    two  12/03/2024  12.2
    3  three  13/03/2024   N/A
    >>> vectorizer.fit_transform(df)
         B     C
    0  0.0   1.5
    1  3.0   NaN
    2  1.0  12.2
    3  2.0   NaN

    Here the column 'A' has been dropped and the column 'B' has been passed to
    the ``OrdinalEncoder`` (instead of the default choice which would have been
    ``DatetimeEncoder``).

    We can see that 'A' and 'B' are now treated as 'specific' columns:

    >>> vectorizer.column_to_kind_
    {'C': 'numeric', 'A': 'specific', 'B': 'specific'}

    Preprocessing and postprocessing steps are not applied to columns appearing
    in ``specific_columns``. For example 'B' has not gone through
    ``ToDatetime()``:

    >>> vectorizer.all_processing_steps_
    {'A': [Drop()], 'B': [OrdinalEncoder()], 'C': [CleanNullStrings(), DropUninformative(), ToFloat(), PassThrough(), {'C': ToFloat()}]}

    Specifying several ``specific_transformers`` for the same column is not allowed.

    >>> vectorizer = TableVectorizer(
    ...     specific_transformers=[('passthrough', ['A', 'B']), ('drop', ['A'])]
    ... )

    >>> vectorizer.fit_transform(df)
    Traceback (most recent call last):
        ...
    ValueError: Column 'A' used twice in 'specific_transformers', at indices 0 and 1.
    """  # noqa: E501

    def __init__(
        self,
        *,
        cardinality_threshold=40,
        low_cardinality=LOW_CARDINALITY_TRANSFORMER,
        high_cardinality=HIGH_CARDINALITY_TRANSFORMER,
        numeric=NUMERIC_TRANSFORMER,
        datetime=DATETIME_TRANSFORMER,
        specific_transformers=(),
        drop_null_fraction=1.0,
        drop_if_constant=False,
        drop_if_unique=False,
        datetime_format=None,
        n_jobs=None,
    ):
        self.cardinality_threshold = cardinality_threshold
        self.low_cardinality = _utils.clone_if_default(
            low_cardinality, LOW_CARDINALITY_TRANSFORMER
        )
        self.high_cardinality = _utils.clone_if_default(
            high_cardinality, HIGH_CARDINALITY_TRANSFORMER
        )
        self.numeric = _utils.clone_if_default(numeric, NUMERIC_TRANSFORMER)
        self.datetime = _utils.clone_if_default(datetime, DATETIME_TRANSFORMER)
        self.specific_transformers = specific_transformers
        self.n_jobs = n_jobs
        self.drop_null_fraction = drop_null_fraction
        self.drop_if_constant = drop_if_constant
        self.drop_if_unique = drop_if_unique
        self.datetime_format = datetime_format

    def fit(self, X, y=None):
        """Fit transformer.

        Parameters
        ----------
        X : dataframe of shape (n_samples, n_features)
            Input data to transform.

        y : array-like, shape (n_samples,) or (n_samples, n_outputs) or None, \
                default=None
            Target values for supervised learning (None for unsupervised
            transformations).

        Returns
        -------
        self : TableVectorizer
            The fitted estimator.
        """
        self.fit_transform(X, y=y)
        return self

    def fit_transform(self, X, y=None):
        """Fit transformer and transform dataframe.

        Parameters
        ----------
        X : dataframe of shape (n_samples, n_features)
            Input data to transform.

        y : array-like of shape (n_samples,) or (n_samples, n_outputs) or None, \
                default=None
            Target values for supervised learning (None for unsupervised
            transformations).

        Returns
        -------
        dataframe
            The transformed input.
        """
        self._check_specific_columns()
        self._make_pipeline()
        output = self._pipeline.fit_transform(X, y=y)
        self.all_outputs_ = sbd.column_names(output)
        self._store_processing_steps()
        self._store_column_kinds()
        self._store_output_to_input()
        # for sklearn
        self.feature_names_in_ = self._preprocessors[0].feature_names_out_
        self.n_features_in_ = len(self.feature_names_in_)

        return output

    def transform(self, X):
        """Transform dataframe.

        Parameters
        ----------
        X : dataframe of shape (n_samples, n_features)
            Input data to transform.

        Returns
        -------
        dataframe
            The transformed input.
        """
        check_is_fitted(self, "transformers_")
        return self._pipeline.transform(X)

    def _check_specific_columns(self):
        specific_columns = {}
        for i, config in enumerate(self.specific_transformers):
            try:
                _, cols = config
                assert isinstance(cols, Iterable) and not isinstance(cols, str)
            except (ValueError, TypeError, AssertionError):
                raise ValueError(
                    "'specific_transformers' must be a list of "
                    "(transformer, list of columns) pairs. "
                    f"Got {config!r} at index {i}."
                )
            for c in cols:
                if not isinstance(c, str):
                    raise ValueError(
                        "Column names in 'specific_transformers' must be strings,"
                        f" got {c}"
                    )
                if c in specific_columns:
                    raise ValueError(
                        f"Column {c!r} used twice in 'specific_transformers', "
                        f"at indices {specific_columns[c]} and {i}."
                    )
            specific_columns.update(dict.fromkeys(cols, i))
        self._specific_columns = list(specific_columns.keys())

    def _make_pipeline(self):
        def add_step(steps, transformer, cols, allow_reject=False):
            steps.append(
                wrap_transformer(
                    _check_transformer(transformer),
                    cols,
                    allow_reject=allow_reject,
                    n_jobs=self.n_jobs,
                    columnwise=True,
                )
            )
            return steps[-1]

        cols = s.all() - self._specific_columns
        self._preprocessors = _get_preprocessors(
            cols=cols,
            drop_null_fraction=self.drop_null_fraction,
            drop_if_constant=self.drop_if_constant,
            drop_if_unique=self.drop_if_unique,
            n_jobs=self.n_jobs,
            add_tofloat32=True,
            datetime_format=self.datetime_format,
        )

        self._encoders = []
        self._named_encoders = {}
        for name, selector in [
            ("numeric", s.numeric()),
            ("datetime", s.any_date()),
            (
                "low_cardinality",
                s.cardinality_below(self.cardinality_threshold),
            ),
            ("high_cardinality", s.all()),
        ]:
            self._named_encoders[name] = add_step(
                self._encoders,
                getattr(self, name),
                cols & selector - _created_by(*self._encoders),
            )

        self._specific_transformers = []
        for specific_transformer, specific_cols in self.specific_transformers:
            add_step(self._specific_transformers, specific_transformer, specific_cols)

        self._postprocessors = []
        add_step(
            self._postprocessors,
            ToFloat(),
            s.all() - _created_by(*self._specific_transformers) - s.categorical(),
            allow_reject=True,
        )
        self._pipeline = make_pipeline(
            *self._preprocessors,
            *self._encoders,
            *self._specific_transformers,
            *self._postprocessors,
        )

    def _store_processing_steps(self):
        input_names = self._preprocessors[0].feature_names_out_
        to_outputs = {col: [col] for col in input_names}
        to_steps = {col: [] for col in input_names}
        self.transformers_ = {}
        # [1:] because CheckInputDataFrame not included in all_processing_steps_
        for step in self._preprocessors[1:]:
            for col, transformer in step.transformers_.items():
                to_steps[col].append(transformer)
        for step in self._encoders + self._specific_transformers:
            for col, transformer in step.transformers_.items():
                to_steps[col].append(transformer)
                to_outputs[col] = step.input_to_outputs_[col]
                self.transformers_[col] = transformer
        for col, outputs in to_outputs.items():
            post_proc = {
                c: t
                for c in outputs
                if (t := self._postprocessors[0].transformers_.get(c)) is not None
            }
            if post_proc:
                to_steps[col].append(ShortReprDict(post_proc))
        self.input_to_outputs_ = to_outputs
        self.all_processing_steps_ = to_steps

    def _store_column_kinds(self):
        self.kind_to_columns_ = {
            k: v.used_inputs_ for k, v in self._named_encoders.items()
        }
        self.kind_to_columns_["specific"] = self._specific_columns
        self.column_to_kind_ = {
            c: k for k, cols in self.kind_to_columns_.items() for c in cols
        }

    def _store_output_to_input(self):
        self.output_to_input_ = {
            out: input_
            for (input_, outputs) in self.input_to_outputs_.items()
            for out in outputs
        }

    def _sk_visual_block_(self):
        if hasattr(self, "kind_to_columns_"):
            name_details = [
                self.kind_to_columns_["numeric"],
                self.kind_to_columns_["datetime"],
                self.kind_to_columns_["low_cardinality"],
                self.kind_to_columns_["high_cardinality"],
            ]
        else:
            name_details = None
        return _VisualBlock(
            "parallel",
            [self.numeric, self.datetime, self.low_cardinality, self.high_cardinality],
            names=["numeric", "datetime", "low_cardinality", "high_cardinality"],
            name_details=name_details,
        )

    # scikit-learn compatibility

    def _more_tags(self):
        """
        Used internally by sklearn to ease the estimator checks.
        """
        return {
            "X_types": ["2darray", "string"],
            "allow_nan": [True],
            "_xfail_checks": {
                "check_complex_data": "Passthrough complex columns as-is.",
            },
        }

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        tags.input_tags.string = True
        tags.input_tags.allow_nan = True
        return tags

    def get_feature_names_out(self, input_features=None):
        """Return the column names of the output of ``transform`` as a list of strings.

        Parameters
        ----------
        input_features : array-like of str or None, default=None
            Ignored.

        Returns
        -------
        list of strings
            The column names.
        """
        check_is_fitted(self, "all_outputs_")
        return np.asarray(self.all_outputs_)
